跳到主要内容

Java 集合类学习 Queue 队列

Queue 接口

Queue 表示队列,在Java中,一般是FIFO(先进先出)的队列,但也有不是先进先出的队列,比如优先级队列(堆)

介绍下 Queue 接口的基本方法:

public interface Queue<E> extends Collection<E> {
// 添加一个元素到队列,在先进先出队列中,是添加到队列尾部
// 如果由于容量不足插入失败,则抛出异常,不会返回false
boolean add(E e);

// 和add方法一样,不过不同之处在于,这个方法添加失败一般是返回false,而不是抛出异常
boolean offer(E e);

// 移除队列尾部的元素(最后进来的元素),如果队列为空,则抛出异常,不返回false
E remove();

// 和remove方法一样,不过这个方法在队列为空时会返回false,不抛出异常
E poll();

// 查看队列头部的元素(最先进来的元素),如果队列为空,则抛出异常,不返回null
E element();

// 和element方法一样,不过这个在队列为空时,返回null,不抛出异常
E peek();
}

所有队列实现类一览

  1. ArrayDeque, (数组双端队列)
  2. PriorityQueue, (优先级队列)
  3. ConcurrentLinkedQueue, (基于链表的并发队列)
  4. DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
  5. ArrayBlockingQueue, (基于数组的并发阻塞队列)
  6. LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
  7. LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
  8. PriorityBlockingQueue, (带优先级的无界阻塞队列)
  9. SynchronousQueue (并发同步阻塞队列)

双端队列看 Deque 接口那篇笔记

add 和 offer 插入

add(E e)offer(E e) 的语义相同,都是向优先队列中插入元素,只是 Queue 接口规定二者对插入失败时的处理不同,前者在插入失败时抛出 异常,后则则会返回 false。

抛出的异常如下:

IllegalStateException –             如果由于容量限制此时无法添加元素
ClassCastException – 如果指定元素的类阻止它被添加到这个队列
NullPointerException – 如果指定元素为 null 且此队列不允许 null 元素
IllegalArgumentException – 如果此元素的某些属性阻止将其添加到此队列
UnsupportedOperationException – 如果此集合不支持添加操作

element 和 peek 查看

element()peek() 的语义完全相同,都是获取但不删除队首元素,也就是队列中权值最小的那个元素,二者唯一的区别是当方法失败时前者抛出异常(如果为空会抛出 NoSuchElementException 异常),后者返回 null

remove 和 poll 出队

remove()poll() 方法的语义也完全相同,都是获取并删除队首元素,区别是当方法失败时前者抛出异常(如果为空会抛出 NoSuchElementException 异常),后者返回 null。

非线程安全的实现类

只有这个 PriorityQueue 队列是非线程安全的实现类,所以单独拿出来讲(不过人家也不完全算队列,而是 “堆” 属于优先级队列)

PriorityQueue

PriorityQueue 比较特殊,它是优先级队列,不是 FIFO 的,而是根据元素的优先级,优先级高的或者低的先出队列,因为本质是一颗特殊的完全二叉树,所以它不允许存放 null 元素

具体细节看 PriorityQueue 的那篇笔记

非线程安全的 Queue 实现类就只有这一个(不包括 Deque 的实现类,Deque 拓展了 Queue 接口),其他的大部分都是一些内部类

下面就开始讲本篇的重点:阻塞队列和非阻塞队列

阻塞队列和非阻塞队列

在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。

在并发队列上 JDK 提供了两套实现,一个是以 ConcurrentLinkedQueue 为代表的高性能队列非阻塞,一个是以 BlockingQueue 接口为代表的阻塞队列,无论哪种都继承自 Queue。

使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环 CAS 的方式来实现。

阻塞队列

阻塞队列(即 BlockingQueue 家族)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

阻塞队列(Blocking queue)提供了可阻塞的 put 和 take 方法,它们与可定时的 offer 和 poll 是等价的。如果 Queue 已经满了,put 方法会被阻塞直到有空间可用;如果 Queue 是空的,那么 take 方法会被阻塞,直到有元素可用。Queue 的长度可以有限,也可以无限;无限的 Queue 永远不会充满,所以它的 put 方法永远不会阻塞。

阻塞式队列工作流程:

  • 入列(存):阻塞式队列,如果存放的队列超出队列的总数,是时候会进行等待(阻塞)。当队列达到总数的时候,入列(生产者)会进行阻塞。这时候只有当消费者消费了队列中的队列之后,生产者才可以继续往队列中存放队列。

  • 出列(取):如果获取队列为空的情况下,这时候也会进行等待(阻塞)。这时候队列中没有队列,消费者无法消费队列,只有生产者往对队列中存放队列之后,消费者才可以进行消费。

队列中的队列如果被消费了就会从队列中删除掉。

阻塞队列的应用场景

阻塞队列常用于生产者和消费者的场景(生产者-消费者设计模式),生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

补充知识:生产者-消费者设计模式:该模式不会添加一个任务便立即处理,而是把任务放在于一个任务清单(TODO)中,以备后期处理。生产者-消费者模式简化了开发,因为它解除了生产者和消费者之间相互依赖的代码。生产者和消费者以不同的或者变化的速度生产和消费数据,生产者-消费者模式将这些活动解耦,因而简化了工作负荷的管理。

最常见的生产者-消费者设计是将线程池与工作队列相结合。

阻塞队列的优缺点

优点:

阻塞队列简化了消费者的编码,因为 take 会保持阻塞直到可用数据出现。但如果生产者不能足够快地产生工作,让消费者忙碌起来,那么消费者只能一直等待,直到有工作可做。同时,put 方法的阻塞特性也大大地简化了生产者的编码;如果使用一个有界队列,那么当队列充满的时候,生产者就会阻塞,暂不能生成更多的工作,从而给消费者时间来赶进进度。

有界队列(有 Max Size)是强大的资源管理工具,用来建立可靠的应用程序:它们遏制那些可以产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。

缺点:

虽然生产者-消费者模式可以把生产者和消费者的代码相互解耦合,但是它们的行为还是间接地通过共享队列耦合在一起了

基于阻塞的算法也会带来一些活跃度失败的风险。如果线程在持有锁的时候因为阻塞 I/O,页面错误,或其他原因发生延迟,很可能所有的线程都不能前进了。

BlockingQueue 各个实现类

类库中包含一些 BlockingQueue 的实现,其中 LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO 队列,与 LinkedList 和 ArrayList 相似,但是却拥有比同步 List 更好的并发性能。

PriorityBlockingQueue 是一个按优先级顺序排序的队列(堆)

最后一个 BlockingQueue 的实现是 SynchronousQueue,它根本上不是一个真正的队列,因为它不会为队列元素维护任何存储空间。不过,它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。因为 SynchronousQueue 没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则 put 和 take 会一直阻止。SynchronousQueue 这类队列只有在消费者充足的时候比较合适,它们总能为下一个任务作好准备。

非阻塞队列

注意:一个线程的失败或挂起不影响其他线程的失败或挂起,这样的算法称为非阻塞(nonblocking)算法;如果算法的每一个步骤中都有一些线程能够继续执行,那么这样的算法称为锁自由(lock-free)算法。

非阻塞队列的代表 ConcurrentLinkedQueue 是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常 ConcurrentLinkedQueue 性能好于 BlockingQueue

入队和出队操作均利用 CAS(compare and set)更新,来保证元素的一致性,这样允许多个线程并发执行,并且不会因为加锁而阻塞线程,使得并发性能更好。但是 该队列不允许 null 元素。

非阻塞队列使用场景

TODO: 用到再更新...

ArrayBlockingQueue

ArrayBlockingQueue 是一个我们常用的典型的有边界(即有容量限制)的 阻塞队列,其内部的实现是基于数组来实现的,我们在创建时需要指定其长度,它的线程安全性由 ReentrantLock(可重入锁)来实现的。不允许存放 null 元素

public ArrayBlockingQueue(int capacity) {...}
public ArrayBlockingQueue(int capacity, boolean fair) {...}

如上所示,ArrayBlockingQueue 提供的构造函数中,我们需要指定队列的长度,同时我们也可以设置队列是都是公平的,当我们设置了容量后就不能再修改了,符合数组的特性,此队列按照先进先出(FIFO)的原则对元素进行排序。

和 ReentrantLock 一样,如果 ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。

LinkedBlockingQueue

LinkedBlockingQueue 是一个 阻塞队列它既可以是有边界的,也可以是无边界的。不允许存放null元素

注意:队列容量 capacity 默认为 Integer.MAX_VALUE,也就是默认为无边界队列

SynchronousQueue

synchronousQueue 是一个不存储任何元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。同时它也支持公平锁和非公平锁。

synchronousQueue 的容量并不是 1,而是 0。因为它本身不会持有任何元素,它是直接传递的,synchronousQueue 会把元素从生产者直接传递给消费者,在这个过程中能够是不需要存储的

线程池 CachedThreadPool 就是利用了该队列。Executors.newCachedThreadPool(),因为这个线程池它的最大线程数是 Integer.MAX_VALUE,它是更具需求来创建线程,所有的线程都是临时线程,使用完后空闲60秒则被回收,

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级排序的无界阻塞队列,可以通过自定义实现 compareTo() 方法来指定元素的排序规则,或者通过构造器参数 Comparator 来指定排序规则。但是需要注意插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。

它的 take 方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put 方法永远不会阻塞,添加操作始终都会成功

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是无界非阻塞队列,基于链表实现,使用了无锁算法提高并发性能。不允许存放 null 元素

TODO: 用到再更新...

Reference

JAVA中常见的阻塞队列详解 阻塞队列与非阻塞队列区别应用场景 并发阻塞队列和非阻塞队列详解 java阻塞队列与非阻塞队列 Java Queue接口常用实现类